In [1]:
import os
import sys
spark_path = "/Users/flavio.clesio/Documents/spark-2.1.0"
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip") # Must be the same version of your Spark Version
In [2]:
from pyspark import SparkContext
from pyspark import SparkConf
In [3]:
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "4g"))
In [4]:
sc = SparkContext(conf = conf)
In [ ]:
sc
In [5]:
ROOT_PATH = ('/Users/flavio.clesio/Downloads/ml-100k')
In [6]:
movielens = sc.textFile(ROOT_PATH + "/u.data")
u.data -- The full u data set, 100000 ratings by 943 users on 1682 items. Each user has rated at least 20 movies. Users and items are numbered consecutively from 1. The data is randomly ordered. This is a tab separated list of user id | item id | rating | timestamp.
In [7]:
movielens.first()
Out[7]:
In [8]:
movielens.count()
Out[8]:
In [9]:
#Clean up the data by splitting it
#Movielens readme says the data is split by tabs and
#is user product rating timestamp
clean_data = movielens.map(lambda x:x.split('\t'))
In [10]:
clean_data.take(10)
Out[10]:
In [11]:
#As an example, extract just the ratings to its own RDD
#rate.first() is 3
rate = clean_data.map(lambda y: int(y[2]))
In [12]:
rate.mean() #Avg rating is 3.52986
Out[12]:
In [13]:
#Extract just the users
users = clean_data.map(lambda y: int(y[0]))
In [14]:
users.distinct().count() #943 users
Out[14]:
In [15]:
#You don't have to extract data to its own RDD
#This command counts the distinct movies
#There are 1,682 movies
clean_data.map(lambda y: int(y[1])).distinct().count()
Out[15]:
In [16]:
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.recommendation import MatrixFactorizationModel
from pyspark.mllib.recommendation import Rating
In [17]:
#We'll need to map the movielens data to a Ratings object
#A Ratings object is made up of (user, item, rating)
mls = movielens.map(lambda l: l.split('\t'))
In [18]:
ratings = mls.map(lambda x: Rating(int(x[0]),int(x[1]), float(x[2])))
In [19]:
#Need a training and test set
train, test = ratings.randomSplit([0.8,0.2],7856)
In [20]:
print 'The number of traning instances is:', train.count()
In [21]:
print 'The number of traning instances is:', test.count()
In [22]:
#Need to cache the data to speed up training
train.cache()
Out[22]:
In [23]:
test.cache()
Out[23]:
In [24]:
#Setting up the parameters for ALS
rank = 5 # Latent Factors to be made
numIterations = 10 # Times to repeat process
In [25]:
#Create the model on the training data
model = ALS.train(train, rank, numIterations)
In [26]:
#Examine the latent features for one product
model.productFeatures().first()
Out[26]:
In [27]:
#Examine the latent features for one user
model.userFeatures().first()
Out[27]:
In [28]:
# For Product X, Find N Users to Sell To
model.recommendUsers(242,100)
Out[28]:
In [29]:
# For User Y Find N Products to Promote
model.recommendProducts(196,10)
Out[29]:
In [30]:
#Predict Single Product for Single User
model.predict(196, 242)
Out[30]:
In [31]:
# Predict Multi Users and Multi Products
# Pre-Processing
pred_input = train.map(lambda x:(x[0],x[1]))
In [32]:
# Lots of Predictions
#Returns Ratings(user, item, prediction)
pred = model.predictAll(pred_input)
In [33]:
#Get Performance Estimate
#Organize the data to make (user, product) the key)
true_reorg = train.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))
In [34]:
#Do the actual join
true_pred = true_reorg.join(pred_reorg)
In [35]:
#Need to be able to square root the Mean-Squared Error
from math import sqrt
In [36]:
MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)#Results in 0.7629908117414474
In [37]:
#Test Set Evaluation
#More dense, but nothing we haven't done before
test_input = test.map(lambda x:(x[0],x[1]))
pred_test = model.predictAll(test_input)
test_reorg = test.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred_test.map(lambda x:((x[0],x[1]), x[2]))
test_pred = test_reorg.join(pred_reorg)
test_MSE = test_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
test_RMSE = sqrt(test_MSE)#1.0145549956596238
In [38]:
#If you're happy, save your model!
#model.save(sc,ROOT_PATH + "/ml-model")
#sameModel = MatrixFactorizationModel.load(sc, ROOT_PATH + "/ml-model)
In [43]:
RMSE
Out[43]: